In [1]:
import numpy as np
from matplotlib import pyplot as plt
from __future__ import division
from itertools import product
%matplotlib inline

General 1-D CA class


In [111]:
class CA(object):
    '''
    1-D cellular automaton object    
            
    '''
    def __init__(self, rule, alphabet_size, radius, initial_condition):
        
        '''
        Initializes the CA.       
    
        Parameters
        ----------

        rule: int 
            Defines mapping from current neighborhood to next state. Generalized of Wolfram ECA rule numbering.
            Use max_rule(alphabet_size, radius) function to get maximum rule number allowed for given
            alphabet size and radius.
            
        alphabet_size: int
            Number of local states for each site on CA. 
            
        radius: int
            Size of neighborhood used for update rule. Neighborhood size is 2*radius +1.

        initial_condition: array_like
            Initializes initial condition for ECA.
            

        Instance Variables 
        ------------------
        self._initial:
            Stored initial conditions.

        self._spacetime:
            Array of spacetime values. Initialized to initial_condition. Space is in horizontal direction, 
            time is vertical. 

        self._rule:
            CA lookup table rule. Initialized to rule.
            
        self._alphabet:
            Local state alphabet size. Initialized to alphabet_size
            
        self._radius:
            Neighborhood radius. Initialized to radius. 

        self._table:
            Hypercube array for CA lookup table using CA rule number. Neighborhoods represented as
            coordinates on hypercube, and value at the coordinate is the corresponding mapping of that 
            neighborhood.

        self._transduced:
            Array of transduced spacetime data (i.e. output of specific transducer run over raw spacetime 
            data).
            Initialized to all zeros. 

        self._filtered:
            Array of filtered spacetime data. All domain sites are 0 and all non-domain sites are 1. 
            Initialized to all zeros. 
            
        '''
        
        size = np.size(initial_condition)
        self._initial = initial_condition
        self._spacetime = initial_condition
        self._rule = rule
        self._alphabet = alphabet_size
        self._radius = radius
        self._table = lookup_table(rule, alphabet_size, radius)
        self._transduced = np.zeros(size, dtype = int)
        self._masked = np.zeros(size, dtype = int)
        self._filtered = np.zeros(size, dtype = int)
        

                    
        
 
    def evolve(self, time):
        '''
         Evolves the CA for specified number of steps. 
        
        Parameters
        ---------
        time: int
            Number of time steps to evolve the CA.
            
        Updates
        -------
        self._spacetime
            Adds new spacetime data to array of spacetime data.
            
        '''
        T = time
        #Initial conditions are 1-D arrays but spacetime data is 2D array, so must treat each case
        #separately. The first case is for when CA is first initialzed or reset and spacetime is 
        #equal to initial condition, and thus only 1-D. 
        if len(np.shape(self._spacetime)) == 1:
            size = np.size(self._spacetime)
            #get neighborhood values
            neighbors = neighborhood(self._spacetime, self._radius)
            #initialize array for new configuration
            new = np.zeros(size, dtype = int)   
            #use lookup table to set values for new configuration
            new = self._table[neighbors]
            #stack new configuration into spacetime data
            self._spacetime = np.vstack((self._spacetime, new)) 
            #now that spacetime is 2-D, time loop will run so remove a time step
            T = time -1
            
        #time step loop for 2-D spacetime array
        rows, columns  = np.shape(self._spacetime)
        for t in xrange(T):
            #get array of current neighborhood values
            neighbors = neighborhood(self._spacetime[rows - 1], self._radius)
            #initialize array for new configuration string
            new = np.zeros(columns, dtype = int)
            #run the current neighborhood values through the lookup table to get next configuration
            new = self._table[neighbors]
            #add new configuration values to spacetime array
            self._spacetime = np.vstack((self._spacetime, new)) 
            #increase number of rows since this is initialized outside of time loop
            rows += 1
            
    
    
    def reset(self, new_state = None):
        '''
        Resets the spacetime data back to the given state or original initial conditions. 
        
        Parameters
        ----------
        new_state: array_like
            Configuration of new initial conditions to reset the CA to. 
            If None, CA is returned to original initial conditions.
        '''
        if new_state is None:
            #reset spacetime data to the initial conditions
            self._spacetime = np.copy(self._initial)
        else:
            self._spacetime = np.copy(new_state)
        


    def rewind(self, steps):
        '''
        Rewind spacetime data the given number of steps from current configuration
        '''
        rows, columns = np.shape(self._spacetime)
        self._spacetime = self._spacetime[:rows - steps]
        
        
        
    def get_spacetime(self):
        return self._spacetime
    
    
    
    def current_state(self):
        '''
        Returns current configuration of the CA.
        '''
        if len(np.shape(self._spacetime)) == 1:
            return self._spacetime
        else:
            rows, columns = np.shape(self._spacetime)
            return self._spacetime[rows - 1]
    
    
    
    def lookup_table(self, full = False):
        '''
        Provides the lookup table for the CA.
        
        Parameters
        ----------
        full: bool
            If True, will print full lookup table in lexicographic order. Best if used for ECAs and other small 
            neighborhood size rules. 
            
            If False, returns the lookup table as a dict. 
        
        '''
        #simplify variable names. R = neighborhood size
        A = self._alphabet
        R = 2*self._radius + 1
        scan = tuple(np.arange(0,A)[::-1])
        if full:
            for a in product(scan, repeat = R):
                print a , self._table[a]
            
            return None
        else:
            lookup = {a : self._table[a] for a in product(scan, repeat = R)}
            return lookup
    
    
    def perturb(self, N_flips):
        '''
        Randomly flips specified number of bits on current spatial configuration of CA.
        
        Parameters
        ----------
        
        N_flips: int
            Number of sites on spatial configuration to flip
            
        Updates
        -------
        self._spacetime
            Last row of spacetime array (current configuration) has 'N_flips' bits randomly selected 
            and flipped.             
            
        '''        
        
        rows, columns  = np.shape(self._spacetime)
        #indices for current configuration array
        indices = np.arange(0, columns, dtype = int)
        #array to keep track of location of bit flips so a single site is not flipped multiple times
        flip_check = np.zeros(columns)
        count = 0
        #loop to flip bits while making sure to not flip bits more than once
        while count < N_flips:
            flip = np.random.choice(indices)
            if flip_check[flip] == 0:
                flip_check[flip] += 1
                count += 1
                self._spacetime[rows -1][flip] = -self._spacetime[rows -1][flip] + 1


            
            
    def transduce(self, t, direction = 'right'):
        '''
        Uses given transducer to scan spacetime data and record output to instance variable
        self._transduced. Result can be displayed with CA.diagram('transduced') method.
        
        Parameters
        ----------
        t : tuple(d, start_state, synch)
            d: dict 
                Mapping current machine state and input symbol to next machine state
                and output symbol.
                
            start_state: str
                Start state for the transducer.
                
            synch: int
                Number of transient steps need for transducer to synchronize to input data.
                
        '''
        
        #reset / initialize self._transduced to correct shpae
        rows, columns = np.shape(self._spacetime)
        self._transduced = np.zeros((rows, columns), dtype = int)
        #initialize transducer object with given transducer input
        machine = transducer(*t)
        #scan each row of spacetime data with transducer
        for r in xrange(rows):
            self._transduced[r] = machine.scan(self._spacetime[r], direction)
            


            
        
    def mask(self, t, direction ='right'):
        '''
        Uses given transducer to mask spacetime data and stores output to instance variable
        self._masked. Result can be displayed with CA.diagram('masked') method.
        
        Parameters
        ----------
        t: transducer tuple (d, start_state, synch)
            d: dict
                Mapping of current machine state and input symbol to next machine state
                and output symobl.
                
            start_state: str
                Start state for the transducer.
                
            synch: int
                Number of transient steps needed for transducer to synchronize to input data.
        '''
        rows, columns = np.shape(self._spacetime)
        self._masked = np.zeros((rows, columns), dtype = int)
        machine = transducer(*t)
        for r in xrange(rows):
            self._masked[r] = machine.mask(self._spacetime[r], self._alphabet, direction)
    

            
    
    
    def domain_filter(self, t, fill = False):
        '''
        Uses given transducer to filter spacetime data and stores output to instance variable
        self._filtered. Result can be displayed with CA.diagram('filtered') method.
        
        Parameters
        ----------
        t: transducer tuple (d, start_state, synch)
            d: dict
                Mapping of current machine state and input symbol to next machine state
                and output symobl.
                
            start_state: str
                Start state for the transducer.
                
            synch: int
                Number of transient steps needed for transducer to synchronize to input data.
                
        fill: bool
            Set to True if left and right transducer scans are different; fill == True will combine and fill-in
            scans from both directions. If False, will set self._filtered to simplified transducer output 
            with 0 domain and 1 for not domain. 
        '''
        rows, coluns = np.shape(self._spacetime)
        self._filtered = np.zeros((rows, coluns), dtype = int)
        machine = transducer(*t)
        for r in xrange(rows):
            self._filtered[r] = machine.domain_filter(self._spacetime[r], fill)
        
        
        
        
  
    
    def diagram(self, option = None, size = None):
        '''
        Plots raw, transduced, or filtered spacetime diagram.
        
        Parameters 
        ----------
        option: 'spacetime' , 'transduced', or 'filtered'
            Input to chose which spacetime data to plot. If 'None', plots raw spacetime data.
            
        size: float
            Deteremines figure size.
            
        Returns
        -------
        Matplotlib imshow of self._spacetime, self._transduced, or self._filtered depending on 'option' input
        
        '''
        
    
        if size == None:
            s = 15
        else:
            s = size
            
            
        if option == 'spacetime' or option is None:
            plt.figure(figsize=(s,s))
            plt.imshow(self._spacetime, cmap=plt.cm.bone, interpolation='nearest') 
            plt.show()
        elif option == 'transduced':
            plt.figure(figsize=(s,s))
            plt.imshow(self._transduced, cmap=plt.cm.bone, interpolation='nearest')
            plt.show()
        elif option == 'filtered':
            plt.figure(figsize=(s,s))
            plt.imshow(self._filtered, cmap=plt.cm.Blues, interpolation='nearest')
            plt.show()
        elif option == 'masked':
            plt.figure(figsize=(s,s))
            plt.imshow(self._masked, cmap=plt.cm.bone, interpolation='nearest')
            plt.show()

In [112]:
class ECA(CA):
    '''
    Elementary cellular automata subclass. Same as CA subclass but with alphabet size of 2 and 
    neighborhood radius 1. 
    '''
    alphabet_size = 2
    radius = 1
    
    def __init__(self, rule, initial_condition):
        '''
        Initializes the ECA.
        
        Parameters
        ----------
        rule: int between 0 and 255
            Defines the lookup table using Wolfram lexicographic numbering scheme.
            
        initial_condition: array_like
            Initial configuration for the ECA.
        '''
        super(ECA, self).__init__(rule, ECA.alphabet_size, ECA.radius, initial_condition)

Support code for CA class


In [4]:
def max_rule(alphabet, radius):
    '''
    Returns the maximum rule number for a given alphabet size and neighborhood radius.
    
    Parameters
    ----------
    alphabet: int
        Alphabet size for the CA.
        
    radius: int
        Neighborhood radius length for the CA.
        
    Returns
    -------
    rule number: int
        The maximum allowed rule number for the given alphabet size and radius.
    '''
    rule = 0
    #R = neighborhood size
    R = 2*radius + 1
    #add up max possible output for every neighborhood 
    for i in xrange(alphabet ** R):
        rule += (alphabet - 1) * (alphabet)**i

    return rule

Need to fix int2base to produce arbitrarily large number. Currently cut it down to necessary size, but can't increase it up


In [5]:
import string
digs = string.digits + string.letters

def int2base(x, base, radius):
    if x < 0: sign = -1
    elif x == 0: return digs[0]
    else: sign = 1
    x *= sign
    digits = []
    while x:
        digits.append(digs[int(x % base)])
        x /= base
    if sign < 0:
        digits.append('-')
    digits.reverse()
    result = ''.join(digits)
    leng = len(result)
    R = base ** (2*radius + 1)
    
    #return result
    return result[leng - R:]

In [6]:
test = int2base(3838393, 3,4)
print len(test)


693

In [7]:
def lookup_table(rule, alphabet_size, radius):
    '''
    Produces the lookup table for the specified CA rule of the given alphabet size and radius.
    
    Parameters
    ----------
    rule: int
        CA rule number that defines the lookup table. For neighborhood in lexicographic order, the 
        corresponding CA update rule mapping is added up in the base of the alphabet size. This is the 
        rule number.
        
    alphabet_size: int
        Number of local site states available to the CA.
        
    radius: int
        Radius of the neighorhood used for the update rule. For radius R, neighborhood size is 2R+1. 
        
    
    Returns
    -------
    table: ndarray
        Multidimensional array representation of lookup table as a hypercube. The neighborhood is 
        represented as coordinates on the hypercube and the associated update mapping of that neighborhood
        is the value of the hypercube at those coordinates.
    '''
    #R = neighborhood size
    R = 2*radius + 1
    A = alphabet_size
    #convert rule number to appropriate base using int2base function
    rule_number = int2base(rule, alphabet_size, radius)
    #build shape tuple to get corret shape for lookuptable ndarray from the given radius and alphabet size
    shape = ()
    for i in xrange(R):
        shape += (A,)
    #initialize empty ndarray for table with the correct shape
    table = np.zeros(shape, dtype = int)
    i = 0
    #scan is tuple (A, A-1, A-2, ..., 0) used to build neighborhoods in lexicographic order
    scan = tuple(np.arange(0,A)[::-1])
    #run loop through neighborhoods in lexicographic order and fill in the lookup table outputs
    #on the ndarray at coordinates corresponding to the neighborhood value
    for a in product(scan, repeat = R):
        table[a] = rule_number[i]
        i+=1
        
    return table

In [8]:
def neighborhood(data, radius):
    '''
    Returns list of arrays of neighborhood values for input data. Formatted to index the multidimensional 
    array lookup table.
    
    Parameters
    ----------
    data: array_like
        Input string of data.
        
    radius: int
        Radius of the neighborood.
        
    Returns
    -------
    neighborhood: list of arrays
        List of neighbor arrays formatted to index multidimensional array lookup table. Of the form
        [array(leftmost neighbors), ..., array(left nearest neighbors), array(data), array(right nearest
        neighbors), ..., array(rightmost neighbors)]
    '''
    #initialize lists for left enighbor values and right neighbor values
    left = []
    right = []
    #run loop through radial values up to the radius. left values are run in reverse so that both sides build
    #from inside out. r = 0 is skipped and added on in the last step
    for r in xrange(radius):
        left += [np.roll(data, radius-r)]
        right += [np.roll(data, -(r+1))]
    neighbors = left + [data] + right
    return neighbors

Below are functions for different initial conditions


In [9]:
def random_state(length, alphabet_size):
    '''
    Returns array of randomly selected states of given length with given alphabet size. 
    
    Parameters
    ----------
    length: int
        Lenght of the array
        
    alphabet_size: int
        Size of the alphabet to be randomly selected from in creating array.
        
    Returns
    -------
    state: array
        Array of given length of states randomly selected from {0,...,A -1} for alphabet size A. 
    '''
    state = np.random.choice(np.arange(0, alphabet_size), length)
        
    return state

In [10]:
def uniform_state(length, symbol):
    '''
    Returns homogeneous array of given symbol at given length
    
    Parameters
    ----------
    length: int
        Length of the array to be created.
        
    symbol: int
        Symbol which will uniformly populate the array.
        
    Returns
    -------
    out: array
        Homogeneous array of given length uniformly populated by given symbol.
    '''
    if symbol == 1:
        state = np.ones(length, dtype = int)
    elif symbol == 0:
        state = np.zeros(length, dtype = int)
    else:
        state = np.zeros(length, dtype = int)
        state[...] = symbol
        
    return state

In [11]:
def variable_density_state(length, p):
    state = np.zeros(length, dtype = int)
    for i in xrange(length):
        if np.random.rand() < p:
            state[i] = 1
    return state

In [12]:
def n_ones(length, n ):
    state = np.zeros(length, dtype = int)
    ind = np.arange(0,length,1)
    count = 0
    while count < n:
        test = np.random.choice(ind)
        if state[test] == 0: 
            state[test] =1
            count += 1
            
    return state

In [13]:
def domain_18(length, bias, wildcard = 'even'):
    state = np.zeros(length, dtype = int)
    for s in xrange(length):
        if wildcard == 'even':
            if s % 2 == 0 and np.random.rand() < bias:
                state[s] = 1
        elif wildcard == 'odd':
            if (s+1) % 2 == 0 and np.random.rand() < bias:
                state[s] = 1

    return state

Transducer Code


In [15]:
def transducer_18():
    '''
    Zero-wildcard transducer. Domain in ECA rule 18. 
    
    Returns
    -------
    Tuple: (transducer, start_state, synch_time)
        transducer: mapping of current machine state and string input to next machine state and output
        
        start_state: starting state of the transducer
        
        synch_time: number of transient steps needed for transducer to synchronize with input data 
    '''
    
    
    transducer = { ('a', 0):('a', 0) , ('a', 1):('b', 0) ,  ('b', 1):('b', 1) , ('b', 0):('c', 0) , \
                  ('c', 0):('b', 0) , ('c',1):('b',0)}
    start_state = 'a'
    synch_time = 0

    return (transducer, start_state, synch_time)

In [16]:
def transducer_54():
    '''
    Domain transducer for ECA rule 54. 8 state, period 2 domain.
    
    Returns
    -------
    Tuple: (transducer, start_state, synch_time)
        transducer: mapping of current machine state and string input to next machine state and output
        
        start_state: starting state of the transducer
        
        synch_time: number of transient steps needed for transducer to synchronize with input data. 
    '''
    
    transducer = { ('ta', 0):('tb', 0) , ('ta', 1):('tc', 0) ,  ('tb', 0):('td', 0) , ('tb', 1):('te', 0) , \
                  ('tc', 0):('tf', 0) , ('tc',1):('tg',0) , ('td', 0):('d',0) , ('td', 1):('a',0) , \
                 ('te', 0):('b',0) , ('te',1):('g',0) , ('tf', 0):('c', 0) , ('tf', 1):('f',0) , \
                 ('tg', 0):('h', 0) , ('tg', 1):('e', 0) ,    \
                 ('a', 0):('b', 0) , ('a', 1):('g', 1) , ('b', 0):('c', 0) , ('b', 1):('f', 2) , \
                 ('c', 0):('d', 0) , ('c', 1):('a', 3) , ('d', 0):('d', 4) , ('d', 1):('a', 0) , \
                 ('e', 0):('c', 5) , ('e', 1):('f', 0) , ('f', 0):('b', 6) , ('f', 1):('g', 0) , \
                 ('g' , 0):('e', 7) , ('g', 1):('h', 0) , ('h', 0):('e', 0) , ('h', 1):('h', 8)}
    
    start_state = 'ta'
    
    synch_time = 3
    
    return (transducer, start_state, synch_time)

In [17]:
class transducer(object):
    
    def __init__(self, d, start, synch):
        '''Instantiates transducer.
        
        Parameters
        ----------
        d : Python dict
            Tuple to Tuple mapping. Maps (current machine state, input symbol) to 
            (next machine state, output symbol). 
            
        start: str
            Start state for the transducer. 
            
        synch: int
            Number of synchronization steps needed to reach recurrent component of transducer.
            
            
        Instance Variables
        ------------------
        self._transducer:
            Python dict mapping current machine state and input symbol to next machine state and output symbol
            
        self._start_state:
            Start state for the machine.
            
        self._synch:
            Number of synchronization steps.
        '''
        

        
        self._transducer = d
        self._start = start
        self._synch = synch
            
            
            
            
            
            
    def scan(self, data, direction = 'right'):
        '''
        Takes data string as transducer input and returns corresponding transducer output
        
        Parameters
        ----------
        data: array_like 
            Input string to be scanned by transducer
            
        direction: str
            Direction 'right' or 'left' which transducer will scan the input data. Default to 'right'.
            
            
        Returns
        -------
        output: array
            Array of raw transducer output.
            
        '''
        length = np.size(data)
        output = np.zeros(length, dtype = int)
        
        #initialize machine state to start state value
        machine = self._start

        #run loop in appropriate direction to scan the data with the transducer using the trandsucers
        #dictionary / mapping, updating machine state and output at each step
        for c in xrange(length + self._synch):
            if direction == 'right':
                machine, output[c % length] = self._transducer[(machine, data[c % length])]

            elif direction == 'left':
                i = length - (c+1)
                machine, output[i % length] = self._transducer[(machine, data[i % length])]

        return output    
    
        

    
    
    
    
    def mask(self, data, alphabet_size, direction = 'right'):
        '''
        Takes data string as transducer input and returns corresponding transducer output overlayed
        ontop of original data, i.e. domains retain their structure. 
        
        Parameters
        ----------
        data: array_like 
            Input string to be scanned by transducer.
            
        direction: str
            Direction 'right' or 'left' which transducer will scan the input data. Default to 'right'.
            
            
        Returns
        -------
        output: array
            Array of transducer output overlayed ontop of original data. 
            
        '''
        length = np.size(data)
        mask = np.zeros(length, dtype = int)
        hold = np.copy(data)
        
        #initialize machine state
        machine = self._start

        #scan data with transducer, updating machine state and output each step
        for c in xrange(length + self._synch):
            if direction == 'right':
                machine, mask[c % length] = self._transducer[(machine, data[c % length])]

            elif direction == 'left':
                i = length - (c+1)
                machine, mask[i % length] = self._transducer[(machine, data[i % length])]
                    
        #clear non-domain values so mask can be simply added on top of original data, but not have 
        #interference with non-zero non-domain values.
        hold[mask != 0] = 0  
        
        #transducer output is zero for all state values in domain, so must add (alphabet_size - 1) 
        #onto transducer output to give unique values to defects above the original alphabet, which 
        #may be present in the domain.
        mask[mask != 0] += (alphabet_size - 1)
        
        return hold + mask
    
        

        
        
    def domain_filter(self, data, fill = False):
        '''
        Takes data string as transducer input and returns simplified transducer output with only two symbols;
        domain (0) or not domain (1).
        
        Parameters
        ----------
        data: array_like
            Input string to be scanned by transducer.
            
        fill: bool
            Set to True if there is a discrepency between left and right transducer scan. This will
            run both directions and fill-in between.
            False will return simplified output of single-direction scan. 
            
        
        Returns
        -------
        output: array
            Array of simplified transducer output.
        '''
        scanL = self.scan(data, 'left')
        scanR = self.scan(data, 'right')
        length = np.size(data)
        hold = np.zeros(length, dtype = int)
        
        if fill:
            go = False
            for c in xrange(length):

                if scanL[c] != 0 and go == False:
                    go = True
                    hold[c] = 1

                elif (scanL[c] == 0) and (scanR[c] == 0) and (go == True):
                    hold[c] = 1

                elif scanR[c] != 0 and scanL[c] == 0 and go == True:
                    hold[c] = 1
                    go = False
                    
        else:
            hold = np.copy(scanR)
            hold[hold != 0] = 1
                    
        return hold

In [ ]: